#include "lsv_rcache.h"
#include "lsv_log.h"
#include "lsv_volume.h"
#include "lsv_wbuffer_internal.h"
#include "volume_proto.h"
#include "list.h"

int lsv_rcache_init(lsv_volume_proto_t * volume_proto){
        int ret = 0;
        lsv_rcache_t * rcache = NULL;

        rcache = (lsv_rcache_t *)malloc(sizeof(lsv_rcache_t));
        if(NULL == rcache){
                /*print log*/
                ret = -ENOMEM;
                return ret;
        }
        memset(rcache, 0, sizeof(lsv_rcache_t));
        INIT_LIST_HEAD(&rcache->page_lru);
        INIT_LIST_HEAD(&rcache->chunk_lru);
        INIT_LIST_HEAD(&rcache->l2_lru);
        INIT_LIST_HEAD(&rcache->io_list);

        rcache->page_hash_index = NULL;
        rcache->page_chunk_hash_index = NULL;
        rcache->chunk_hash_index = NULL;
        rcache->l2_chunk_hash_index = NULL;

        rcache->l2_chunk_num = 0;
        rcache->chunk_num = 0;
        rcache->page_num = 0;
        rcache->io_num = 0;

        ltable_init(&rcache->lba_ltable, "rcache_lba");
        ltable_init(&rcache->chunk_id_ltable, "rcache_chunk_id");
        ltable_init(&rcache->l2_chunk_id_ltable, "rcache_l2_chunk_id");

        co_cond_init(&rcache->lba_cond);
        co_cond_init(&rcache->chunk_id_cond);
        co_cond_init(&rcache->l2_cond);

        volume_proto->rcache = (void *)rcache;
        lsv_rcache_hash_index_init(volume_proto);
        lsv_rwlock_init(&rcache->rwlock);

        return ret;
}

/*chunk_id == LSV_CHUNK_NULL, clean all chunks*/
int lsv_rcache_chunk_clean(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id){

        DINFO("mem_page:chunk_id %u clean......\n", chunk_id);
        lsv_rcache_page_hash_index_regc_with_chunkid(volume_proto, chunk_id);

        if(LSV_RCACHE_CHUNK_NUM_MAX > 0){
                DINFO("mem_chunk: chunk_id %u clean......\n", chunk_id);
                lsv_rcache_chunk_hash_index_regc(volume_proto, chunk_id);
        }

        if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                DINFO("l2_cache: chunk_id %u clean......\n", chunk_id);
                lsv_rcache_l2_cache_hash_index_regc(volume_proto, chunk_id);
        }

        return 0;
}


int lsv_rcache_page_clean(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_off){

        lsv_rcache_page_hash_index_regc(volume_proto, chunk_id, chunk_off);

        /*暂时将失效页对应的chunk同时设为失效，回收chunk_hash_index和l2_cache_hash_index*/
        if(LSV_RCACHE_CHUNK_NUM_MAX > 0){
                lsv_rcache_chunk_hash_index_regc(volume_proto, chunk_id);
        }

        if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                lsv_rcache_l2_cache_hash_index_regc(volume_proto, chunk_id);
        }

        return 0;
}


int lsv_rcache_clean_all(lsv_volume_proto_t * volume_proto){
        int ret = 0;

        lsv_rcache_hash_index_regc_all(volume_proto);

        return ret;
}

/**/
int lsv_rcache_release(lsv_volume_proto_t * volume_proto){
        int ret = 0;
        lsv_rcache_t * rcache = NULL;
        lsv_rcache_page_unit * pu = NULL;
        lsv_rcache_chunk_unit *cu = NULL;
        lsv_rcache_chunk_l2_unit *l2u = NULL;
        lsv_rcache_iotrace_unit *riu = NULL;
        struct list_head *pos, *next;

        rcache = (lsv_rcache_t *)volume_proto->rcache;

        list_for_each_safe(pos, next, &rcache->page_lru){
                pu = list_entry(pos, lsv_rcache_page_unit, page_lru_hook);
                list_del_init(&pu->page_lru_hook);
                free(pu);
        }
        free(rcache->page_hash_index);

        list_for_each_safe(pos, next, &rcache->chunk_lru){
                cu = list_entry(pos, lsv_rcache_chunk_unit, chunk_lru_hook);
                list_del_init(&cu->chunk_lru_hook);
                free(cu);
        }
        free(rcache->chunk_hash_index);

        list_for_each_safe(pos, next, &rcache->l2_lru){
                l2u = list_entry(pos, lsv_rcache_chunk_l2_unit, l2_lru_hook);
                list_del_init(&l2u->l2_lru_hook);
                free(l2u);
        }
        free(rcache->l2_chunk_hash_index);

        list_for_each_safe(pos, next, &rcache->io_list){
                riu = list_entry(pos, lsv_rcache_iotrace_unit, list);
                list_del_init(&riu->list);
                free(riu);
        }

        ltable_destroy(&rcache->lba_ltable);
        ltable_destroy(&rcache->chunk_id_ltable);
        ltable_destroy(&rcache->l2_chunk_id_ltable);

        rcache->page_hash_index = NULL;
        rcache->page_chunk_hash_index = NULL;
        rcache->chunk_hash_index = NULL;
        rcache->l2_chunk_hash_index = NULL;

        rcache->l2_chunk_num = 0;
        rcache->chunk_num = 0;
        rcache->page_num = 0;
        rcache->io_num = 0;

        lsv_rwlock_destroy(&rcache->rwlock);
        free(volume_proto->rcache);
        volume_proto->rcache = NULL;
        return ret;
}

int lsv_rcache_clean(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id){
        DINFO("clean chunk_id %u......\n", chunk_id);

        if(chunk_id == LSV_CHUNK_NULL){
                return lsv_rcache_clean_all(volume_proto);
        }else{
                return lsv_rcache_chunk_clean(volume_proto, chunk_id);
        }
}

int lsv_rcache_page_lookup(lsv_volume_proto_t *volume_proto, lsv_u64_t off, lsv_s32_t size, buffer_t *append_buf){
        int ret = 0;
        lsv_rcache_t *rcache = (lsv_rcache_t *)volume_proto->rcache;
        lsv_u64_t lba = off - off % LSV_PAGE_SIZE;

#ifdef __NO_RCACHE__
        return 0;
#endif

        ltable_rdlock(&rcache->lba_ltable, lba);
        ret = lsv_rcache_page_hash_index_lookup(volume_proto, 0 , 0, off, size, append_buf);
        ltable_unlock(&rcache->lba_ltable, lba);
        DINFO("read off %llu size %d ret %d......\n", (LLU)off, size, ret);
        if(ret == size){
                volume_proto->read_rc_hit++;
                rcache->page_hit_cnt ++;
                DERROR("volume: hit-miss: <%llu %llu> -------rcache: page_hit - chunk_hit - l2_hit - page_load - chunk_load - l2_swap: <%llu %llu %llu %llu %llu %llu>\n", (LLU)volume_proto->read_rc_hit, (LLU)volume_proto->read_rc_miss, (LLU)rcache->page_hit_cnt, (LLU)rcache->chunk_hit_cnt, (LLU)rcache->l2_hit_cnt, (LLU)rcache->page_load, (LLU)rcache->chunk_load, (LLU)rcache->l2_swap);
                return ret;
        }
        return ret;
}

/**
 *
 * @param volume_proto
 * @param chunk_id
 * @param chunk_offset
 * @param off
 * @param size
 * @param rio  NULL | valid raw_io_t
 * @param append_buf
 * @return
 */
// TODO page替换策略是否将lru变得更复杂一些，讲每个pu增加引用计数hit_count，当该pu位于队列尾部要被换出时，
// 如果该pu->hit_count>1,则将pu->hit_count减1，放到队列头部，当hit_count为1时且在队列尾部，则被换出
int lsv_rcache_lookup(lsv_volume_proto_t *volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_offset,
                lsv_u64_t off, lsv_s32_t size, const raw_io_t *rio,
                buffer_t *append_buf){
        int ret = 0;

        lsv_rcache_t *rcache = (lsv_rcache_t *)volume_proto->rcache;
#ifdef __NO_CACHE__
        lsv_u64_t lba = off - off % LSV_PAGE_SIZE;
        DINFO("load page %llu, lookup data lba %llu chunk_id %u chunk_off %d size %d\n", (LLU)lba, (LLU)off, chunk_id, chunk_offset, size);
        ltable_wrlock(rcache->lba_ltable, lba);
        ret = lsv_rcache_load_page(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
        ltable_unlock(rcache->lba_ltable, lba);
        return ret;
#endif
        DINFO("lba %llu chunk_id %u chunk_offset %d size %d\n", (LLU)off, chunk_id, chunk_offset, size);
        ret = lsv_rcache_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
        if(ret == size){
                volume_proto->read_rc_hit++;
                DERROR("volume: hit-miss: <%llu %llu> -------rcache: page_hit - chunk_hit - l2_hit - page_load - chunk_load - l2_swap: <%llu %llu %llu %llu %llu %llu>\n", (LLU)volume_proto->read_rc_hit, (LLU)volume_proto->read_rc_miss, (LLU)rcache->page_hit_cnt, (LLU)rcache->chunk_hit_cnt, (LLU)rcache->l2_hit_cnt, (LLU)rcache->page_load, (LLU)rcache->chunk_load, (LLU)rcache->l2_swap);
                return ret;
        }

        if (lsv_rcache_chunk_load_judge(volume_proto->rcache, off, chunk_id, chunk_offset, size, rio)) {
                //LSV_RCACHE_CHUNK_NUM_MAX >0,若LSV_RCACHE_CHUNK_NUM_MAX_L2>0,则包含swap过程，swap过程没有yield过程，故只针对cu加锁
                //LSV_RCACHE_CHUNK_NUM_MAX_L2>0,即LSV_RCACHE_CHUNK_NUM_MAX=0,此时，只有l2_cache过程，需要针对l2u加锁
                if(LSV_RCACHE_CHUNK_NUM_MAX > 0) {
                        DINFO("load chunk %llu, lookup data lba %llu chunk_off %d size %d\n", (LLU)chunk_id, (LLU)off, chunk_offset, size);
                        ltable_wrlock(&rcache->chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_load_chunk(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                        ltable_unlock(&rcache->chunk_id_ltable, chunk_id);
                }else if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                        DINFO("l2_cache load chunk %u, lookup data lba %llu chunk_off %d size %d\n", chunk_id, (LLU)off, chunk_offset, size);
                        ltable_wrlock(&rcache->l2_chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_lookup_l2_cache(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                        ltable_unlock(&rcache->l2_chunk_id_ltable, chunk_id);
                }
        } else {
                lsv_u64_t lba = off - off % LSV_PAGE_SIZE;
                DINFO("load page %llu, lookup data lba %llu chunk_id %u chunk_off %d size %d\n", (LLU)lba, (LLU)off, chunk_id, chunk_offset, size);
                ltable_wrlock(&rcache->lba_ltable, lba);
                ret = lsv_rcache_load_page(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                ltable_unlock(&rcache->lba_ltable, lba);
        }
        DERROR("volume: hit-miss: <%llu %llu> -------rcache: page_hit - chunk_hit - l2_hit - page_load - chunk_load - l2_swap: <%llu %llu %llu %llu %llu %llu>\n", (LLU)volume_proto->read_rc_hit, (LLU)volume_proto->read_rc_miss, (LLU)rcache->page_hit_cnt, (LLU)rcache->chunk_hit_cnt, (LLU)rcache->l2_hit_cnt, (LLU)rcache->page_load, (LLU)rcache->chunk_load, (LLU)rcache->l2_swap);
        return ret;
}

int lsv_rcache_lookup2(lsv_volume_proto_t *volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_offset,
                lsv_u64_t off, lsv_s32_t size, raw_io_t *rio,
                buffer_t *append_buf){
        int ret = 0;
        lsv_rcache_t *rcache = (lsv_rcache_t *)volume_proto->rcache;
        DINFO("lba %llu chunk_id %u chunk_offset %d size %d\n", (LLU)off, chunk_id, chunk_offset, size);
        lsv_u64_t toff;
        lsv_s32_t tsize;
        for(toff = off; toff < off + size;){
                tsize = LSV_PAGE_SIZE - toff % LSV_PAGE_SIZE;
                ret = lsv_rcache_hash_index_lookup(volume_proto, chunk_id,
                                chunk_offset + toff - off,
                                toff,
                                tsize,
                                append_buf);
                if(ret == tsize){
                        volume_proto->read_rc_hit++;
                        toff += tsize;
                }else{
                        break;
                }
        }

        if(toff >= off + size){
                return size;
        }

        rio->size = rio->size - (toff - off);

        ret = lsv_rcache_chunk_load_judge(volume_proto->rcache, toff, chunk_id, chunk_offset + toff - off, off + size - toff, rio);
        if(2 == ret){
                ltable_wrlock(&rcache->chunk_id_ltable, chunk_id);
                ret = lsv_rcache_load_pages(volume_proto, chunk_id, chunk_offset + toff - off,
                                toff, off + size - toff, append_buf);
                ltable_unlock(&rcache->chunk_id_ltable, chunk_id);
                return size;
        }else if (1 == ret) {
                if(LSV_RCACHE_CHUNK_NUM_MAX > 0) {
                        ltable_wrlock(&rcache->chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_load_chunk(volume_proto, chunk_id, chunk_offset + toff - off,
                                        toff, off + size - toff, append_buf);
                        ltable_unlock(&rcache->chunk_id_ltable, chunk_id);
                }else if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                        ltable_wrlock(&rcache->l2_chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_lookup_l2_cache(volume_proto,
                                        chunk_id,
                                        chunk_offset + toff - off,
                                        toff,
                                        off - toff,
                                        append_buf);
                        ltable_unlock(&rcache->l2_chunk_id_ltable, chunk_id);
                }
                return size;
        } else if (0 == ret){
                lsv_u64_t lba = off - off % LSV_PAGE_SIZE;
                ltable_wrlock(&rcache->lba_ltable, lba);
                ret = lsv_rcache_load_page(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                ltable_unlock(&rcache->lba_ltable, lba);
        }
        return ret;
}
